from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
import  datetime
import pendulum
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from dagen.operators import SSHOperator

pt= "{{ execution_date.in_tz('Asia/Shanghai').format('YYYYMMDD') }}"
local_tz = pendulum.timezone("Asia/Shanghai")

#充电领域相关标签
#调度周期
schedule_interval="0 2 * * *"
default_args = {
    'owner': 'hgc',  # 拥有者名称
    'depends_on_past': False,   # 是否依赖上一个自己的执行状态
    'email': ['guanghu@tesla.com'],  # 接收通知的email列表
    'email_on_failure': True,  # 是否在任务执行失败时接收邮件
    'email_on_retry': True,  # 是否在任务重试时接收邮件
    'retries': 2  # 失败重试次数
}
dag = DAG(
**{
'dag_id': "cdp_charge_detail_gen",
'catchup': False,
'start_date': datetime.datetime(2021, month=10, day=21, tzinfo=local_tz),
'schedule_interval': schedule_interval,
'default_args':default_args
}
)


ods2dwd='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dwd_cdp_chargerdb_charging_behavior_overwrite --sql-params "--hivevar pt=%s"'%pt
dwd2dws='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dws_cdp_agg_chargerdb_charging_behavior_overwrite --sql-params "--hivevar pt=%s"'%pt


cdp_charge_detail_gen_dwd = SSHOperator(
    dag=dag,
    **{
        'task_id': "cdp_charge_detail_gen_dwd",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': ods2dwd
      }
)

cdp_charge_detail_gen_dws = SSHOperator(
    dag=dag,
    **{
        'task_id': "cdp_charge_detail_gen_dws",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': dwd2dws
      }
)

par_task3 = ExternalTaskMarker(
    task_id="par_task3",
    external_dag_id="cdp_integration_dag_gen",
    external_task_id="child_task3",
)

check_one_id_task003 = ExternalTaskSensor(
    task_id='check_one_id_task003',  # waiting for the whole dag to execute
#     execution_delta=None,  # Same day as today
    external_dag_id='cdp_create_oneid',  # here is the id of the dag
    external_task_id='check_one_id_par_task003',  # waiting for the whole dag to execute
    dag=dag,
    timeout=60,
    execution_delta=datetime.timedelta(hours=1),
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    check_existence=True
)

# set the dependencies
check_one_id_task003>>cdp_charge_detail_gen_dwd>>cdp_charge_detail_gen_dws>>par_task3